package arp.message.kafka;

import java.io.ByteArrayInputStream;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectInput;

import arp.process.publish.Message;

public class FSTDeserializationStrategy implements
		KafkaMessageDeserializationStrategy<byte[]> {

	private FSTConfiguration fstConf;

	public FSTDeserializationStrategy() {
		fstConf = FSTConfiguration.createJsonConfiguration();
		fstConf.setForceSerializable(true);
	}

	@Override
	public void configValueDeserializerClass(Properties props) {
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
				ByteArrayDeserializer.class);
	}

	@Override
	public Message deserialize(ConsumerRecord<String, byte[]> record)
			throws Exception {
		byte[] msg = record.value();
		FSTObjectInput ois = fstConf.getObjectInput(new ByteArrayInputStream(
				msg));
		return (Message) ois.readObject();
	}
}
